Messaging Integration

Rapid overview

Messaging & Integration Practice Exercises

Master message-driven architecture, event-driven patterns, and reliable integration strategies for distributed systems.

---

Foundational Questions

Q: Compare RabbitMQ and ZeroMQ for distributing price updates. When would you choose one over the other?

A: RabbitMQ: brokered, supports persistence, routing, acknowledgments, management UI, plugins. ZeroMQ: brokerless sockets, ultra-low latency but manual patterns, no persistence out of the box. Use RabbitMQ for durable, complex routing, enterprise integration, where administrators need visibility and security. Use ZeroMQ for high-throughput, in-process/edge messaging; avoid if you need persistence or central management.

Q: Explain how to ensure at-least-once delivery with RabbitMQ while preventing duplicate processing.

A: Use durable queues, persistent messages, manual ack, idempotent consumers. Enable publisher confirms to ensure the broker persisted the message before acknowledging to the producer.

channel.BasicConsume(queue, autoAck: false, consumer);
consumer.Received += (sender, ea) =>
{
    Handle(ea.Body);
    channel.BasicAck(ea.DeliveryTag, multiple: false);
};

Use when you can tolerate duplicates; critical to ensure no loss. Avoid when exactly-once semantics required—use transactional outbox + dedup.

Q: How would you design a saga pattern to coordinate account funding across multiple services?

A: Orchestrator or choreography; manage compensations (reverse ledger entry, refund payment).

public async Task Handle(FundAccount command)
{
    var transferId = await _payments.DebitAsync(command.PaymentId);
    try
    {
        await _ledger.CreditAsync(command.AccountId, command.Amount);
        await _notifications.SendAsync(command.AccountId, "Funding complete");
    }
    catch
    {
        await _payments.RefundAsync(transferId);
        throw;
    }
}

Use when multi-step, distributed transactions. Avoid when single system handles all steps—simple ACID transaction suffices.

Q: Discuss the outbox pattern and how it prevents message loss in event-driven systems.

A: Write domain event to outbox table within same transaction, then relay to message bus. A background dispatcher polls the outbox table, publishes events, and marks them as processed (with retries and exponential backoff).

await using var tx = await db.Database.BeginTransactionAsync();
order.Status = OrderStatus.Accepted;
db.Outbox.Add(new OutboxMessage(order.Id, new OrderAccepted(order.Id)));
await db.SaveChangesAsync();
await tx.CommitAsync();

Use when need atomic DB + message publish. Avoid when no shared database or eventual consistency acceptable without duplication.

---

RabbitMQ Advanced Patterns

Q: Implement a publisher with confirmation to ensure messages are persisted.

A: Use publisher confirms for reliability.

public class ReliableRabbitMqPublisher
{
    private readonly IConnection _connection;
    private readonly ILogger<ReliableRabbitMqPublisher> _logger;

    public ReliableRabbitMqPublisher(
        ConnectionFactory factory,
        ILogger<ReliableRabbitMqPublisher> logger)
    {
        _connection = factory.CreateConnection();
        _logger = logger;
    }

    public async Task PublishAsync<T>(string exchange, string routingKey, T message)
    {
        using var channel = _connection.CreateModel();

        // Enable publisher confirms
        channel.ConfirmSelect();

        // Declare exchange as durable
        channel.ExchangeDeclare(
            exchange: exchange,
            type: ExchangeType.Topic,
            durable: true,
            autoDelete: false);

        var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message));
        var properties = channel.CreateBasicProperties();
        properties.Persistent = true;  // Make message persistent
        properties.MessageId = Guid.NewGuid().ToString();
        properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds());

        channel.BasicPublish(
            exchange: exchange,
            routingKey: routingKey,
            basicProperties: properties,
            body: body);

        // Wait for confirmation
        var confirmed = channel.WaitForConfirms(TimeSpan.FromSeconds(5));

        if (!confirmed)
        {
            _logger.LogError("Message {MessageId} was not confirmed by broker", properties.MessageId);
            throw new Exception("Message publish failed - not confirmed");
        }

        _logger.LogInformation("Message {MessageId} published and confirmed", properties.MessageId);
    }

    public async Task PublishBatchAsync<T>(string exchange, string routingKey, List<T> messages)
    {
        using var channel = _connection.CreateModel();
        channel.ConfirmSelect();

        channel.ExchangeDeclare(
            exchange: exchange,
            type: ExchangeType.Topic,
            durable: true,
            autoDelete: false);

        // Publish all messages in batch
        foreach (var message in messages)
        {
            var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message));
            var properties = channel.CreateBasicProperties();
            properties.Persistent = true;
            properties.MessageId = Guid.NewGuid().ToString();

            channel.BasicPublish(
                exchange: exchange,
                routingKey: routingKey,
                basicProperties: properties,
                body: body);
        }

        // Wait for all confirms
        channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(30));
        _logger.LogInformation("Batch of {Count} messages published and confirmed", messages.Count);
    }
}

Q: Implement a resilient consumer with retry logic and dead letter queue.

A: Handle failures with retries and DLQ.

public class ResilientRabbitMqConsumer
{
    private readonly IConnection _connection;
    private readonly ILogger<ResilientRabbitMqConsumer> _logger;

    public void StartConsuming<T>(
        string queueName,
        Func<T, Task> messageHandler,
        int maxRetries = 3)
    {
        var channel = _connection.CreateModel();

        // Declare main queue
        var mainQueueArgs = new Dictionary<string, object>
        {
            { "x-dead-letter-exchange", $"{queueName}.dlx" },
            { "x-dead-letter-routing-key", $"{queueName}.dlq" }
        };

        channel.QueueDeclare(
            queue: queueName,
            durable: true,
            exclusive: false,
            autoDelete: false,
            arguments: mainQueueArgs);

        // Declare dead letter exchange and queue
        channel.ExchangeDeclare($"{queueName}.dlx", ExchangeType.Direct, durable: true);
        channel.QueueDeclare($"{queueName}.dlq", durable: true, exclusive: false, autoDelete: false);
        channel.QueueBind($"{queueName}.dlq", $"{queueName}.dlx", $"{queueName}.dlq");

        // Declare retry queue with TTL
        var retryQueueArgs = new Dictionary<string, object>
        {
            { "x-dead-letter-exchange", "" },  // Default exchange
            { "x-dead-letter-routing-key", queueName },
            { "x-message-ttl", 5000 }  // 5 second delay
        };

        channel.QueueDeclare(
            queue: $"{queueName}.retry",
            durable: true,
            exclusive: false,
            autoDelete: false,
            arguments: retryQueueArgs);

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += async (sender, ea) =>
        {
            try
            {
                var body = Encoding.UTF8.GetString(ea.Body.ToArray());
                var message = JsonSerializer.Deserialize<T>(body);

                await messageHandler(message);

                // Success - acknowledge
                channel.BasicAck(ea.DeliveryTag, multiple: false);
                _logger.LogInformation("Message {MessageId} processed successfully", ea.BasicProperties.MessageId);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error processing message {MessageId}", ea.BasicProperties.MessageId);

                // Check retry count
                var retryCount = GetRetryCount(ea.BasicProperties);

                if (retryCount < maxRetries)
                {
                    // Send to retry queue
                    _logger.LogInformation(
                        "Retrying message {MessageId} (attempt {Attempt}/{MaxRetries})",
                        ea.BasicProperties.MessageId,
                        retryCount + 1,
                        maxRetries);

                    var retryProperties = channel.CreateBasicProperties();
                    retryProperties.Persistent = true;
                    retryProperties.MessageId = ea.BasicProperties.MessageId;
                    retryProperties.Headers = ea.BasicProperties.Headers ?? new Dictionary<string, object>();
                    retryProperties.Headers["x-retry-count"] = retryCount + 1;

                    channel.BasicPublish(
                        exchange: "",
                        routingKey: $"{queueName}.retry",
                        basicProperties: retryProperties,
                        body: ea.Body);

                    channel.BasicAck(ea.DeliveryTag, multiple: false);
                }
                else
                {
                    // Max retries exceeded - reject to DLQ
                    _logger.LogError(
                        "Message {MessageId} exceeded max retries, sending to DLQ",
                        ea.BasicProperties.MessageId);

                    channel.BasicReject(ea.DeliveryTag, requeue: false);
                }
            }
        };

        channel.BasicConsume(
            queue: queueName,
            autoAck: false,
            consumer: consumer);

        _logger.LogInformation("Started consuming from queue: {QueueName}", queueName);
    }

    private int GetRetryCount(IBasicProperties properties)
    {
        if (properties.Headers != null &&
            properties.Headers.TryGetValue("x-retry-count", out var value))
        {
            return Convert.ToInt32(value);
        }
        return 0;
    }
}

Q: Implement priority queue pattern for urgent messages.

A: Use RabbitMQ priority queues.

public class PriorityQueuePublisher
{
    private readonly IModel _channel;

    public PriorityQueuePublisher(IConnection connection)
    {
        _channel = connection.CreateModel();

        // Declare priority queue
        var args = new Dictionary<string, object>
        {
            { "x-max-priority", 10 }
        };

        _channel.QueueDeclare(
            queue: "orders.priority",
            durable: true,
            exclusive: false,
            autoDelete: false,
            arguments: args);
    }

    public void PublishOrder(Order order, int priority)
    {
        var properties = _channel.CreateBasicProperties();
        properties.Persistent = true;
        properties.Priority = (byte)Math.Min(priority, 10);  // 0-10 range

        var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(order));

        _channel.BasicPublish(
            exchange: "",
            routingKey: "orders.priority",
            basicProperties: properties,
            body: body);
    }
}

// Usage
publisher.PublishOrder(urgentOrder, priority: 10);    // High priority
publisher.PublishOrder(normalOrder, priority: 5);     // Normal priority
publisher.PublishOrder(bulkOrder, priority: 1);       // Low priority

---

Kafka Integration

Q: Implement Kafka producer with idempotent writes and transactions.

A: Use Kafka transactional producer.

public class TransactionalKafkaProducer
{
    private readonly IProducer<string, string> _producer;
    private readonly ILogger<TransactionalKafkaProducer> _logger;

    public TransactionalKafkaProducer(IConfiguration configuration, ILogger<TransactionalKafkaProducer> logger)
    {
        var config = new ProducerConfig
        {
            BootstrapServers = configuration["Kafka:BootstrapServers"],
            TransactionalId = $"producer-{Guid.NewGuid()}",
            EnableIdempotence = true,  // Exactly-once semantics
            Acks = Acks.All,           // Wait for all replicas
            MaxInFlight = 5,
            MessageSendMaxRetries = 10,
            RetryBackoffMs = 100
        };

        _producer = new ProducerBuilder<string, string>(config).Build();
        _producer.InitTransactions(TimeSpan.FromSeconds(30));
        _logger = logger;
    }

    public async Task PublishInTransactionAsync(
        Dictionary<string, List<Message<string, string>>> messagesByTopic)
    {
        _producer.BeginTransaction();

        try
        {
            var deliveryTasks = new List<Task<DeliveryResult<string, string>>>();

            foreach (var (topic, messages) in messagesByTopic)
            {
                foreach (var message in messages)
                {
                    var task = _producer.ProduceAsync(topic, message);
                    deliveryTasks.Add(task);
                }
            }

            // Wait for all messages to be sent
            var results = await Task.WhenAll(deliveryTasks);

            _producer.CommitTransaction();

            _logger.LogInformation("Transaction committed with {Count} messages", results.Length);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Transaction failed, aborting");
            _producer.AbortTransaction();
            throw;
        }
    }

    public void Dispose()
    {
        _producer?.Dispose();
    }
}

Q: Implement Kafka consumer with manual offset management and exactly-once processing.

A: Use consumer with manual commit and idempotency.

public class ExactlyOnceKafkaConsumer
{
    private readonly IConsumer<string, string> _consumer;
    private readonly DbContext _dbContext;
    private readonly ILogger<ExactlyOnceKafkaConsumer> _logger;

    public ExactlyOnceKafkaConsumer(
        IConfiguration configuration,
        DbContext dbContext,
        ILogger<ExactlyOnceKafkaConsumer> logger)
    {
        var config = new ConsumerConfig
        {
            BootstrapServers = configuration["Kafka:BootstrapServers"],
            GroupId = configuration["Kafka:ConsumerGroup"],
            EnableAutoCommit = false,  // Manual commit
            AutoOffsetReset = AutoOffsetReset.Earliest,
            IsolationLevel = IsolationLevel.ReadCommitted  // Only read committed messages
        };

        _consumer = new ConsumerBuilder<string, string>(config).Build();
        _dbContext = dbContext;
        _logger = logger;
    }

    public async Task StartConsumingAsync(
        string topic,
        Func<string, Task> messageHandler,
        CancellationToken cancellationToken)
    {
        _consumer.Subscribe(topic);

        while (!cancellationToken.IsCancellationRequested)
        {
            try
            {
                var consumeResult = _consumer.Consume(cancellationToken);

                // Check if already processed (idempotency)
                var messageId = consumeResult.Message.Key;
                var alreadyProcessed = await _dbContext.ProcessedMessages
                    .AnyAsync(m => m.MessageId == messageId, cancellationToken);

                if (alreadyProcessed)
                {
                    _logger.LogInformation("Message {MessageId} already processed, skipping", messageId);
                    _consumer.Commit(consumeResult);
                    continue;
                }

                await using var transaction = await _dbContext.Database.BeginTransactionAsync(cancellationToken);

                try
                {
                    // Process message
                    await messageHandler(consumeResult.Message.Value);

                    // Record processed message
                    _dbContext.ProcessedMessages.Add(new ProcessedMessage
                    {
                        MessageId = messageId,
                        ProcessedAt = DateTime.UtcNow,
                        Partition = consumeResult.Partition.Value,
                        Offset = consumeResult.Offset.Value
                    });

                    await _dbContext.SaveChangesAsync(cancellationToken);
                    await transaction.CommitAsync(cancellationToken);

                    // Commit offset to Kafka
                    _consumer.Commit(consumeResult);

                    _logger.LogInformation(
                        "Processed message {MessageId} at offset {Offset}",
                        messageId,
                        consumeResult.Offset.Value);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Error processing message {MessageId}", messageId);
                    await transaction.RollbackAsync(cancellationToken);
                    throw;
                }
            }
            catch (ConsumeException ex)
            {
                _logger.LogError(ex, "Kafka consume error");
            }
        }
    }
}

Q: Implement Kafka consumer group rebalancing with state management.

A: Handle partition assignment and revocation.

public class StatefulKafkaConsumer
{
    private readonly IConsumer<string, string> _consumer;
    private readonly Dictionary<int, long> _partitionOffsets = new();
    private readonly ILogger<StatefulKafkaConsumer> _logger;

    public StatefulKafkaConsumer(IConfiguration configuration, ILogger<StatefulKafkaConsumer> logger)
    {
        var config = new ConsumerConfig
        {
            BootstrapServers = configuration["Kafka:BootstrapServers"],
            GroupId = configuration["Kafka:ConsumerGroup"],
            EnableAutoCommit = false
        };

        _consumer = new ConsumerBuilder<string, string>(config)
            .SetPartitionsAssignedHandler(OnPartitionsAssigned)
            .SetPartitionsRevokedHandler(OnPartitionsRevoked)
            .Build();

        _logger = logger;
    }

    private void OnPartitionsAssigned(
        IConsumer<string, string> consumer,
        List<TopicPartition> partitions)
    {
        _logger.LogInformation("Partitions assigned: {Partitions}",
            string.Join(", ", partitions.Select(p => p.Partition.Value)));

        // Load state for assigned partitions
        foreach (var partition in partitions)
        {
            // Could load from database, cache, etc.
            _partitionOffsets[partition.Partition.Value] = 0;
        }
    }

    private void OnPartitionsRevoked(
        IConsumer<string, string> consumer,
        List<TopicPartitionOffset> partitions)
    {
        _logger.LogInformation("Partitions revoked: {Partitions}",
            string.Join(", ", partitions.Select(p => p.Partition.Value)));

        // Save state before losing partitions
        foreach (var partition in partitions)
        {
            var offset = _partitionOffsets.GetValueOrDefault(partition.Partition.Value);
            _logger.LogInformation("Saving offset {Offset} for partition {Partition}",
                offset, partition.Partition.Value);

            // Could save to database, cache, etc.
        }

        // Commit offsets before rebalance
        consumer.Commit(partitions);

        // Clear local state
        foreach (var partition in partitions)
        {
            _partitionOffsets.Remove(partition.Partition.Value);
        }
    }
}

---

Saga Pattern

Q: Implement orchestration-based saga for order processing.

A: Centralized orchestrator manages saga flow.

public class OrderSagaOrchestrator
{
    private readonly IServiceProvider _serviceProvider;
    private readonly IMessageBus _messageBus;
    private readonly ISagaRepository _sagaRepository;
    private readonly ILogger<OrderSagaOrchestrator> _logger;

    public async Task<Guid> StartSagaAsync(CreateOrderCommand command)
    {
        var sagaId = Guid.NewGuid();
        var saga = new OrderSaga
        {
            Id = sagaId,
            State = SagaState.Started,
            Command = command,
            CreatedAt = DateTime.UtcNow
        };

        await _sagaRepository.SaveAsync(saga);

        // Start saga execution
        await ExecuteSagaStepAsync(sagaId, OrderSagaStep.ReserveInventory);

        return sagaId;
    }

    private async Task ExecuteSagaStepAsync(Guid sagaId, OrderSagaStep step)
    {
        var saga = await _sagaRepository.GetAsync(sagaId);

        try
        {
            switch (step)
            {
                case OrderSagaStep.ReserveInventory:
                    await ReserveInventoryAsync(saga);
                    saga.CurrentStep = OrderSagaStep.ProcessPayment;
                    await ExecuteSagaStepAsync(sagaId, OrderSagaStep.ProcessPayment);
                    break;

                case OrderSagaStep.ProcessPayment:
                    await ProcessPaymentAsync(saga);
                    saga.CurrentStep = OrderSagaStep.CreateShipment;
                    await ExecuteSagaStepAsync(sagaId, OrderSagaStep.CreateShipment);
                    break;

                case OrderSagaStep.CreateShipment:
                    await CreateShipmentAsync(saga);
                    saga.State = SagaState.Completed;
                    await _sagaRepository.SaveAsync(saga);
                    await _messageBus.PublishAsync(new OrderSagaCompletedEvent { SagaId = sagaId });
                    break;
            }
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Saga step {Step} failed for saga {SagaId}", step, sagaId);
            saga.State = SagaState.Compensating;
            await _sagaRepository.SaveAsync(saga);
            await CompensateSagaAsync(sagaId, step);
        }
    }

    private async Task CompensateSagaAsync(Guid sagaId, OrderSagaStep failedStep)
    {
        var saga = await _sagaRepository.GetAsync(sagaId);

        _logger.LogWarning("Starting compensation for saga {SagaId} at step {Step}", sagaId, failedStep);

        // Compensate in reverse order
        switch (failedStep)
        {
            case OrderSagaStep.CreateShipment:
                await CancelPaymentAsync(saga);
                goto case OrderSagaStep.ProcessPayment;

            case OrderSagaStep.ProcessPayment:
                await ReleaseInventoryAsync(saga);
                break;
        }

        saga.State = SagaState.Compensated;
        await _sagaRepository.SaveAsync(saga);
        await _messageBus.PublishAsync(new OrderSagaFailedEvent { SagaId = sagaId });
    }

    private async Task ReserveInventoryAsync(OrderSaga saga)
    {
        using var scope = _serviceProvider.CreateScope();
        var inventoryService = scope.ServiceProvider.GetRequiredService<IInventoryService>();

        saga.ReservationId = await inventoryService.ReserveAsync(
            saga.Command.Items,
            TimeSpan.FromMinutes(10));

        await _sagaRepository.SaveAsync(saga);
    }

    private async Task ReleaseInventoryAsync(OrderSaga saga)
    {
        if (saga.ReservationId.HasValue)
        {
            using var scope = _serviceProvider.CreateScope();
            var inventoryService = scope.ServiceProvider.GetRequiredService<IInventoryService>();
            await inventoryService.ReleaseAsync(saga.ReservationId.Value);
        }
    }

    private async Task ProcessPaymentAsync(OrderSaga saga)
    {
        using var scope = _serviceProvider.CreateScope();
        var paymentService = scope.ServiceProvider.GetRequiredService<IPaymentService>();

        saga.PaymentId = await paymentService.ChargeAsync(
            saga.Command.CustomerId,
            saga.Command.TotalAmount);

        await _sagaRepository.SaveAsync(saga);
    }

    private async Task CancelPaymentAsync(OrderSaga saga)
    {
        if (saga.PaymentId.HasValue)
        {
            using var scope = _serviceProvider.CreateScope();
            var paymentService = scope.ServiceProvider.GetRequiredService<IPaymentService>();
            await paymentService.RefundAsync(saga.PaymentId.Value);
        }
    }

    private async Task CreateShipmentAsync(OrderSaga saga)
    {
        using var scope = _serviceProvider.CreateScope();
        var shippingService = scope.ServiceProvider.GetRequiredService<IShippingService>();

        saga.ShipmentId = await shippingService.CreateShipmentAsync(
            saga.Id,
            saga.Command.ShippingAddress);

        await _sagaRepository.SaveAsync(saga);
    }
}

public class OrderSaga
{
    public Guid Id { get; set; }
    public SagaState State { get; set; }
    public OrderSagaStep CurrentStep { get; set; }
    public CreateOrderCommand Command { get; set; }
    public Guid? ReservationId { get; set; }
    public Guid? PaymentId { get; set; }
    public Guid? ShipmentId { get; set; }
    public DateTime CreatedAt { get; set; }
}

public enum SagaState
{
    Started,
    Compensating,
    Compensated,
    Completed
}

public enum OrderSagaStep
{
    ReserveInventory,
    ProcessPayment,
    CreateShipment
}

Q: Implement choreography-based saga using events.

A: Decentralized saga coordination through events.

// Each service publishes events and listens for relevant events

// Inventory Service
public class InventoryService
{
    private readonly IMessageBus _messageBus;

    public async Task HandleOrderCreatedAsync(OrderCreatedEvent evt)
    {
        try
        {
            var reservationId = await ReserveInventoryAsync(evt.Items);

            await _messageBus.PublishAsync(new InventoryReservedEvent
            {
                OrderId = evt.OrderId,
                ReservationId = reservationId,
                Items = evt.Items
            });
        }
        catch (Exception ex)
        {
            await _messageBus.PublishAsync(new InventoryReservationFailedEvent
            {
                OrderId = evt.OrderId,
                Reason = ex.Message
            });
        }
    }

    public async Task HandlePaymentFailedAsync(PaymentFailedEvent evt)
    {
        // Compensate: release inventory
        await ReleaseInventoryAsync(evt.ReservationId);

        await _messageBus.PublishAsync(new InventoryReleasedEvent
        {
            OrderId = evt.OrderId,
            ReservationId = evt.ReservationId
        });
    }
}

// Payment Service
public class PaymentService
{
    private readonly IMessageBus _messageBus;

    public async Task HandleInventoryReservedAsync(InventoryReservedEvent evt)
    {
        try
        {
            var paymentId = await ProcessPaymentAsync(evt.OrderId);

            await _messageBus.PublishAsync(new PaymentProcessedEvent
            {
                OrderId = evt.OrderId,
                PaymentId = paymentId,
                ReservationId = evt.ReservationId
            });
        }
        catch (Exception ex)
        {
            await _messageBus.PublishAsync(new PaymentFailedEvent
            {
                OrderId = evt.OrderId,
                ReservationId = evt.ReservationId,
                Reason = ex.Message
            });
        }
    }

    public async Task HandleShipmentFailedAsync(ShipmentFailedEvent evt)
    {
        // Compensate: refund payment
        await RefundPaymentAsync(evt.PaymentId);

        await _messageBus.PublishAsync(new PaymentRefundedEvent
        {
            OrderId = evt.OrderId,
            PaymentId = evt.PaymentId
        });
    }
}

// Shipping Service
public class ShippingService
{
    private readonly IMessageBus _messageBus;

    public async Task HandlePaymentProcessedAsync(PaymentProcessedEvent evt)
    {
        try
        {
            var shipmentId = await CreateShipmentAsync(evt.OrderId);

            await _messageBus.PublishAsync(new ShipmentCreatedEvent
            {
                OrderId = evt.OrderId,
                ShipmentId = shipmentId,
                PaymentId = evt.PaymentId
            });
        }
        catch (Exception ex)
        {
            await _messageBus.PublishAsync(new ShipmentFailedEvent
            {
                OrderId = evt.OrderId,
                PaymentId = evt.PaymentId,
                Reason = ex.Message
            });
        }
    }
}

// Order Service - tracks overall saga state
public class OrderService
{
    public async Task HandleShipmentCreatedAsync(ShipmentCreatedEvent evt)
    {
        // Saga completed successfully
        await UpdateOrderStatusAsync(evt.OrderId, OrderStatus.Shipped);
    }

    public async Task HandleInventoryReservationFailedAsync(InventoryReservationFailedEvent evt)
    {
        // Saga failed at first step
        await UpdateOrderStatusAsync(evt.OrderId, OrderStatus.Cancelled);
    }

    public async Task HandlePaymentRefundedAsync(PaymentRefundedEvent evt)
    {
        // Saga fully compensated
        await UpdateOrderStatusAsync(evt.OrderId, OrderStatus.Cancelled);
    }
}

---

Outbox Pattern

Q: Implement transactional outbox pattern with background processor.

A: Ensure atomic database updates and message publishing.

// Outbox entity
public class OutboxMessage
{
    public Guid Id { get; set; }
    public string AggregateId { get; set; }
    public string EventType { get; set; }
    public string Payload { get; set; }
    public DateTime CreatedAt { get; set; }
    public DateTime? ProcessedAt { get; set; }
    public int RetryCount { get; set; }
    public string Error { get; set; }
}

// Domain event handler
public class OrderCommandHandler
{
    private readonly DbContext _dbContext;

    public async Task HandleCreateOrderAsync(CreateOrderCommand command)
    {
        await using var transaction = await _dbContext.Database.BeginTransactionAsync();

        try
        {
            // 1. Update domain entities
            var order = new Order
            {
                Id = Guid.NewGuid(),
                CustomerId = command.CustomerId,
                Items = command.Items,
                Status = OrderStatus.Pending
            };

            _dbContext.Orders.Add(order);

            // 2. Write to outbox
            var orderCreatedEvent = new OrderCreatedEvent
            {
                OrderId = order.Id,
                CustomerId = order.CustomerId,
                Items = order.Items
            };

            var outboxMessage = new OutboxMessage
            {
                Id = Guid.NewGuid(),
                AggregateId = order.Id.ToString(),
                EventType = nameof(OrderCreatedEvent),
                Payload = JsonSerializer.Serialize(orderCreatedEvent),
                CreatedAt = DateTime.UtcNow
            };

            _dbContext.OutboxMessages.Add(outboxMessage);

            // 3. Commit transaction (atomic!)
            await _dbContext.SaveChangesAsync();
            await transaction.CommitAsync();
        }
        catch
        {
            await transaction.RollbackAsync();
            throw;
        }
    }
}

// Background outbox processor
public class OutboxProcessor : BackgroundService
{
    private readonly IServiceProvider _serviceProvider;
    private readonly ILogger<OutboxProcessor> _logger;
    private readonly TimeSpan _processingInterval = TimeSpan.FromSeconds(5);

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("Outbox Processor started");

        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                await ProcessOutboxMessagesAsync(stoppingToken);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error processing outbox messages");
            }

            await Task.Delay(_processingInterval, stoppingToken);
        }
    }

    private async Task ProcessOutboxMessagesAsync(CancellationToken cancellationToken)
    {
        using var scope = _serviceProvider.CreateScope();
        var dbContext = scope.ServiceProvider.GetRequiredService<DbContext>();
        var messageBus = scope.ServiceProvider.GetRequiredService<IMessageBus>();

        // Get unprocessed messages
        var messages = await dbContext.OutboxMessages
            .Where(m => m.ProcessedAt == null && m.RetryCount < 5)
            .OrderBy(m => m.CreatedAt)
            .Take(100)
            .ToListAsync(cancellationToken);

        foreach (var message in messages)
        {
            try
            {
                // Publish to message bus
                await messageBus.PublishRawAsync(
                    message.EventType,
                    message.Payload,
                    cancellationToken);

                // Mark as processed
                message.ProcessedAt = DateTime.UtcNow;

                _logger.LogInformation(
                    "Published outbox message {MessageId} of type {EventType}",
                    message.Id,
                    message.EventType);
            }
            catch (Exception ex)
            {
                message.RetryCount++;
                message.Error = ex.Message;

                _logger.LogError(
                    ex,
                    "Failed to publish outbox message {MessageId} (retry {RetryCount})",
                    message.Id,
                    message.RetryCount);
            }
        }

        if (messages.Any())
        {
            await dbContext.SaveChangesAsync(cancellationToken);
        }
    }
}

---

Idempotency

Q: Implement idempotency using distributed cache.

A: Track processed requests to prevent duplicates.

public class IdempotencyMiddleware
{
    private readonly RequestDelegate _next;
    private readonly IDistributedCache _cache;
    private readonly ILogger<IdempotencyMiddleware> _logger;

    public async Task InvokeAsync(HttpContext context)
    {
        // Only handle POST/PUT/PATCH
        if (context.Request.Method != "POST" &&
            context.Request.Method != "PUT" &&
            context.Request.Method != "PATCH")
        {
            await _next(context);
            return;
        }

        // Get idempotency key
        if (!context.Request.Headers.TryGetValue("Idempotency-Key", out var idempotencyKey) ||
            string.IsNullOrEmpty(idempotencyKey))
        {
            context.Response.StatusCode = StatusCodes.Status400BadRequest;
            await context.Response.WriteAsJsonAsync(new { error = "Idempotency-Key header required" });
            return;
        }

        var cacheKey = $"idempotency:{idempotencyKey}";

        // Check if request already processed
        var cachedResponse = await _cache.GetStringAsync(cacheKey);
        if (cachedResponse != null)
        {
            _logger.LogInformation("Returning cached response for idempotency key: {Key}", idempotencyKey);

            var response = JsonSerializer.Deserialize<IdempotentResponse>(cachedResponse);
            context.Response.StatusCode = response.StatusCode;
            context.Response.ContentType = "application/json";

            foreach (var header in response.Headers)
            {
                context.Response.Headers[header.Key] = header.Value;
            }

            await context.Response.WriteAsync(response.Body);
            return;
        }

        // Acquire lock to prevent concurrent processing
        var lockKey = $"{cacheKey}:lock";
        var lockAcquired = await TryAcquireLockAsync(lockKey, TimeSpan.FromSeconds(30));

        if (!lockAcquired)
        {
            context.Response.StatusCode = StatusCodes.Status409Conflict;
            await context.Response.WriteAsJsonAsync(new
            {
                error = "Request with this idempotency key is currently being processed"
            });
            return;
        }

        try
        {
            // Capture response
            var originalBodyStream = context.Response.Body;
            using var responseBody = new MemoryStream();
            context.Response.Body = responseBody;

            await _next(context);

            // Cache successful response
            if (context.Response.StatusCode >= 200 && context.Response.StatusCode < 300)
            {
                responseBody.Seek(0, SeekOrigin.Begin);
                var body = await new StreamReader(responseBody).ReadToEndAsync();
                responseBody.Seek(0, SeekOrigin.Begin);

                var idempotentResponse = new IdempotentResponse
                {
                    StatusCode = context.Response.StatusCode,
                    Headers = context.Response.Headers.ToDictionary(h => h.Key, h => h.Value.ToString()),
                    Body = body
                };

                await _cache.SetStringAsync(
                    cacheKey,
                    JsonSerializer.Serialize(idempotentResponse),
                    new DistributedCacheEntryOptions
                    {
                        AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(24)
                    });
            }

            await responseBody.CopyToAsync(originalBodyStream);
        }
        finally
        {
            await ReleaseLockAsync(lockKey);
        }
    }

    private async Task<bool> TryAcquireLockAsync(string lockKey, TimeSpan timeout)
    {
        var expiry = DateTime.UtcNow.Add(timeout);

        while (DateTime.UtcNow < expiry)
        {
            var acquired = await _cache.GetStringAsync(lockKey) == null;
            if (acquired)
            {
                await _cache.SetStringAsync(lockKey, "locked", new DistributedCacheEntryOptions
                {
                    AbsoluteExpirationRelativeToNow = timeout
                });
                return true;
            }

            await Task.Delay(100);
        }

        return false;
    }

    private async Task ReleaseLockAsync(string lockKey)
    {
        await _cache.RemoveAsync(lockKey);
    }

    private class IdempotentResponse
    {
        public int StatusCode { get; set; }
        public Dictionary<string, string> Headers { get; set; }
        public string Body { get; set; }
    }
}

---

Event Streaming

Q: Implement event store with snapshots for performance.

A: Optimize event replay with periodic snapshots.

public class EventStoreWithSnapshots
{
    private readonly DbContext _dbContext;
    private readonly int _snapshotInterval = 100; // Snapshot every 100 events

    public async Task<T> LoadAggregateAsync<T>(Guid aggregateId) where T : Aggregate, new()
    {
        // Try to load latest snapshot
        var snapshot = await _dbContext.Snapshots
            .Where(s => s.AggregateId == aggregateId)
            .OrderByDescending(s => s.Version)
            .FirstOrDefaultAsync();

        T aggregate;
        long fromVersion;

        if (snapshot != null)
        {
            // Deserialize snapshot
            aggregate = JsonSerializer.Deserialize<T>(snapshot.Data);
            fromVersion = snapshot.Version + 1;
        }
        else
        {
            aggregate = new T();
            fromVersion = 0;
        }

        // Load events after snapshot
        var events = await _dbContext.Events
            .Where(e => e.AggregateId == aggregateId && e.Version >= fromVersion)
            .OrderBy(e => e.Version)
            .ToListAsync();

        foreach (var eventRecord in events)
        {
            var @event = DeserializeEvent(eventRecord);
            aggregate.ApplyEvent(@event);
        }

        return aggregate;
    }

    public async Task SaveAggregateAsync<T>(T aggregate) where T : Aggregate
    {
        var uncommittedEvents = aggregate.GetUncommittedEvents();

        foreach (var @event in uncommittedEvents)
        {
            _dbContext.Events.Add(new EventRecord
            {
                AggregateId = aggregate.Id,
                EventType = @event.GetType().Name,
                Data = JsonSerializer.Serialize(@event),
                Version = @event.Version,
                Timestamp = @event.Timestamp
            });
        }

        await _dbContext.SaveChangesAsync();

        // Check if snapshot needed
        if (aggregate.Version % _snapshotInterval == 0)
        {
            await CreateSnapshotAsync(aggregate);
        }

        aggregate.MarkEventsAsCommitted();
    }

    private async Task CreateSnapshotAsync<T>(T aggregate) where T : Aggregate
    {
        var snapshot = new SnapshotRecord
        {
            AggregateId = aggregate.Id,
            AggregateType = typeof(T).Name,
            Version = aggregate.Version,
            Data = JsonSerializer.Serialize(aggregate),
            CreatedAt = DateTime.UtcNow
        };

        _dbContext.Snapshots.Add(snapshot);
        await _dbContext.SaveChangesAsync();

        // Clean up old snapshots (keep last 3)
        var oldSnapshots = await _dbContext.Snapshots
            .Where(s => s.AggregateId == aggregate.Id)
            .OrderByDescending(s => s.Version)
            .Skip(3)
            .ToListAsync();

        _dbContext.Snapshots.RemoveRange(oldSnapshots);
        await _dbContext.SaveChangesAsync();
    }
}

---

Advanced Messaging Scenarios

Q: How do you handle poison messages without blocking a queue?

A: Use a dead letter queue (DLQ) and track retry attempts via headers.

if (retryCount >= 5)
{
    await _dlqPublisher.PublishAsync(message);
    return;
}

Q: Describe an idempotency strategy for message consumers.

A: Use an idempotency key table with unique constraint and short TTL for cleanup.

CREATE TABLE message_dedup (
    message_id TEXT PRIMARY KEY,
    processed_at TIMESTAMP NOT NULL
);

Q: How would you preserve ordering for a specific key in Kafka?

A: Use the key as the partition key and run a single consumer per partition.

Q: How do you evolve a message schema safely?

A: Use backward-compatible changes, versioned fields, and schema registry validation. Avoid breaking field removals.

Q: Implement a retry policy with exponential backoff and max delay.

A: Increase delay per attempt and cap the maximum delay.

var delayMs = Math.Min(30_000, 200 * (int)Math.Pow(2, attempt));
await Task.Delay(delayMs, ct);

---

Total Exercises: 35+

Master messaging patterns for building resilient, event-driven distributed systems!